package client

import (
	"context"
	"fmt"
	etcd_client "github.com/rpcxio/rpcx-etcd/client"
	"github.com/smallnest/rpcx/client"
	rpcxLog "github.com/smallnest/rpcx/log"
	"github.com/smallnest/rpcx/protocol"
	"gogame/gameconfig"
	"gogame/logger"
	"gogame/pb"
	"gogame/server/rpcx"
	"gogame/server/rpcx/param"
	"net"
	"time"
)

const (
	UnStart   = iota // 未启动
	Run              // 启动
	RetryDial        // 尝试连接
)

type RpcXClient struct {
	rpcx.RpcXStruct
	State         int64                       // 状态 0未启动 1启动 -1尝试连接
	RpcClient     client.XClient              // rpcClient 链接对象
	msgHandleNum  uint16                      // 处理rpcXServer推送消息的数量
	msgChan       chan *protocol.Message      // 接收rpcXServer推送消息
	msgHandleFunc func(msg *protocol.Message) // 处理rpcXServer推送消息的func
	timeout       time.Duration               // 超时
	Selector      *Selector                   // 自定义服务选择
}

// NewRpcXClient 实例化NewRpcXClient
func NewRpcXClient(listen, serviceName string, msgFunc func(msg *protocol.Message)) *RpcXClient {
	_rpcXClient := &RpcXClient{
		State: UnStart,
		RpcXStruct: rpcx.RpcXStruct{
			BasePath:    rpcx.GetClientBasePath(serviceName),
			Type:        "rpcClient",
			ServiceName: serviceName,
			Name:        fmt.Sprintf("rpcClient[%s]", listen),
			Address:     listen,
		},
		msgHandleNum:  2,
		msgChan:       make(chan *protocol.Message, 1000),
		msgHandleFunc: msgFunc,
		timeout:       60,
	}
	_rpcXClient.Selector = NewSelector(_rpcXClient)
	return _rpcXClient
}

// SetMsgHandleNum 设置rpcxServer服务消息处理数
func (r *RpcXClient) SetMsgHandleNum(num uint16) {
	r.msgHandleNum = num
}

func (r *RpcXClient) dial() bool {
	defer func() {
		if rec := recover(); rec != nil {
			errMsg := fmt.Sprintf(
				"rpcx.client Dial 【%s】 fial!! retrying... err-->%v",
				r.ServiceName,
				rec,
			)
			logger.GatewayLog.Warn(errMsg)
		}
	}()

	d, err := etcd_client.NewEtcdV3Discovery(r.BasePath, r.ServiceName, gameconfig.ServerConfig.EtcdUrl, true, nil)
	if err != nil {
		panic(fmt.Sprintf("NewXClient failed to call: %v", err))
	}
	r.RpcClient = client.NewBidirectionalXClient(r.ServiceName, client.Failtry, client.RandomSelect, d, client.DefaultOption, r.msgChan)
	// 中间件
	AddPlugin(r)
	// 自定义选择器
	r.RpcClient.SetSelector(r.Selector)

	// 处理rpcXServer推送的消息
	for i := uint16(0); i < r.msgHandleNum; i++ {
		go r.DoMessage()
	}

	r.State = Run
	fmt.Println()

	return true
}

// Dial 链接worker
func (r *RpcXClient) Dial(async bool) {
	// 禁止rpcx自带log
	rpcxLog.SetDummyLogger()

	_dial := func() {
		if r.dial() {
			return
		}

		r.State = RetryDial
		_ticker := time.NewTicker(time.Second * 2)
		defer _ticker.Stop()
		// 尝试连接，直到成功为止
		for {
			select {
			case <-_ticker.C:
				if r.dial() {
					return
				}
			}
		}
	}

	if async {
		go _dial()
	} else {
		_dial()
	}

}

// DoMessage 服务端消息处理
func (r *RpcXClient) DoMessage() {
	for msg := range r.msgChan {
		if msg.Metadata != nil {
			_, ok := msg.Metadata[protocol.ServiceError]
			// rpcXServer远端断开
			if ok {
				_serverUrl := msg.Metadata["server"]
				logger.GatewayLog.Warn(fmt.Sprintf("【%s】 to rpcXServer【%s】断开", r.Name, _serverUrl))
				r.Selector.DelServer(_serverUrl)
				continue
			}
		}

		if r.msgHandleFunc != nil {
			r.msgHandleFunc(msg)
		} else {
			logger.GatewayLog.Warn(fmt.Sprintf("rpcxClient【%s】 msgHandleFunc is nil", r.Address))
		}

	}
}

// ClientConnectionClose client连接被断开 暂时没用
func (r *RpcXClient) ClientConnectionClose(conn net.Conn) error {
	return nil
}

// Close 链接断开
func (r *RpcXClient) Close() {
	err := r.RpcClient.Close()
	if err != nil {
		fmt.Println("rpcXClient Close err-->", err)
	}
	close(r.msgChan)
}

// getTimeOutCtx 获取一个超时ctx
func (r *RpcXClient) getTimeOutCtx() (context.Context, context.CancelFunc) {
	_ctx, _cancel := context.WithTimeout(context.Background(), time.Second*r.timeout)

	return _ctx, _cancel
}

// Notice 通知-异步
func (r *RpcXClient) Notice(methodName string, request any) {
	_ctx, _ := r.getTimeOutCtx()
	_, _err := r.RpcClient.Go(_ctx, methodName, request, nil, nil)
	if _err != nil {
		fmt.Printf("rpcXClient Notice error!! -->%s\n", _err)
	}
}

// Call 同步
func (r *RpcXClient) Call(methodName string, request any, res any, callFunc GatewayCallFunc) error {
	_ctx, _ := r.getTimeOutCtx()
	_err := r.RpcClient.Call(_ctx, methodName, request, res)

	// 回调
	if callFunc != nil {
		var _errMsg string
		if _err != nil {
			_errMsg = fmt.Sprintf("rpcXClient call error->%s", _err)
		}
		callFunc(res, _errMsg)
	}

	return _err
}

type GatewayCallFunc func(msg any, err string)

// CallBack 异步回调
func (r *RpcXClient) CallBack(methodName string, request *pb.ApiRequest, response *rpcx.Response, callFunc GatewayCallFunc) error {
	_ctx, _ := r.getTimeOutCtx()
	_waitChan, _err := r.RpcClient.Go(_ctx, methodName, request, response, nil)
	if _err != nil {
		return _err
	}

	go func() {
		select {
		// 等待结果返回
		case _waitRes := <-_waitChan.Done:
			var _errMsg string
			if _waitRes.Error != nil {
				_errMsg = fmt.Sprintf("rpcXClient CallBack _waitRes error!! -->%s", _waitRes.Error)
			}

			// 回调
			if callFunc != nil {
				callFunc(response, _errMsg)
			}

		// 超时
		case <-_ctx.Done():
			logger.GatewayLog.Warn(fmt.Sprintf("rpcXClient CallBack timeout!! methodName:%s request:%s", methodName, request))
		}

		// 资源回收
		pb.PutApiRequest(request)

	}()

	return nil
}

// Fork 广播
func (r *RpcXClient) Fork(ctx context.Context, methodName string, args any, res *param.AnyRes) error {
	return r.RpcClient.Fork(ctx, methodName, args, res)
}
